package com.atguigu.handle

import java.{lang, util}
import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.bean.StartUpLog
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis

object DauHandle {
  /**
    * 批次内去重
    * @param filterByRedisDStream
    */
  def filterByGroup(filterByRedisDStream: DStream[StartUpLog]) = {
    //1.将数据转成KV类型k:(mid+logDate)v:具体数据,因为当流里面的数据是kv类型时，才能调用groupByKey
    val midWithLogDateToLogDStream: DStream[((String, String), StartUpLog)] = filterByRedisDStream.map(log => {
      ((log.mid, log.logDate), log)
    })

    //2.将相同的mid以及同一天的数据做聚合
    val midWithLogDateToIterLogDStream: DStream[((String, String), Iterable[StartUpLog])] = midWithLogDateToLogDStream.groupByKey()

    //3.按照时间戳由小到大进行排序，然后取第一条数据
    val midWithLogDateToListLogDStream: DStream[((String, String), List[StartUpLog])] = midWithLogDateToIterLogDStream.mapValues(log => {
      log.toList.sortWith(_.ts < _.ts).take(1)
    })

    //4.首先获取到list集合中的数据然后打散
    val value: DStream[StartUpLog] = midWithLogDateToListLogDStream.flatMap(_._2)

    value
  }

  /**
    * 批次间去重
    *
    * @param startUpLogDStream
    */
  def filterByRedis(startUpLogDStream: DStream[StartUpLog],sc:SparkContext) = {
/*    val value: DStream[StartUpLog] = startUpLogDStream.filter(log => {
      //1.创建redis连接
      val jedis: Jedis = new Jedis("hadoop102", 6379)

      //2.拿当前的mid对比redis中保存的mid看是否有重复，有的话过滤掉，没有的话保留
      val redisKey: String = "Dau:" + log.logDate
      val boolean: lang.Boolean = jedis.sismember(redisKey, log.mid)

      jedis.close()
      !boolean
    })
    value*/

    //方案二：在每个分区中获取redis连接，来减少连接个数
/*    val value: DStream[StartUpLog] = c.mapPartitions(partition => {
      //在每个分区中创建redis连接，来减少连接个数
      val jedis: Jedis = new Jedis("hadoop102", 6379)
      val logs: Iterator[StartUpLog] = partition.filter(log => {
        //2.拿当前的mid对比redis中保存的mid看是否有重复，有的话过滤掉，没有的话保留
        val redisKey: String = "Dau:" + log.logDate
        val boolean: lang.Boolean = jedis.sismember(redisKey, log.mid)
        !boolean
      })

      jedis.close()
      logs
    })
    value*/

    //方案三：在每个RDD里面获取一个连接
    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val value: DStream[StartUpLog] = startUpLogDStream.transform(rdd => {
      //1.获取Redis连接(在Driver端)
      val jedis: Jedis = new Jedis("hadoop102", 6379)

      //2.在Driver获取到Redis中保存的数据
      val redisKey: String = "Dau:" + sdf.format(new Date(System.currentTimeMillis()))
      val mids: util.Set[String] = jedis.smembers(redisKey)

      //3.将查询出来的数据广播到Executor端
      val midsBc: Broadcast[util.Set[String]] = sc.broadcast(mids)

      //4.将查询出来的mids与当前的mid对比
      val filterRDD: RDD[StartUpLog] = rdd.filter(log => {
        !midsBc.value.contains(log.mid)
      })
      jedis.close()
      filterRDD
    })
    value

  }

  /**
    * 将去重后的mid保存至redis
    *
    * @param startUpLogDStream
    */
  def saveToRedis(startUpLogDStream: DStream[StartUpLog]) = {
    startUpLogDStream.foreachRDD(rdd=>{
      rdd.foreachPartition(partition=>{
        //在分区中，创建redis连接以此来减少连接个数
        val jedis: Jedis = new Jedis("hadoop102",6379)
        partition.foreach(log=>{
          //redisKey
          val redisKey: String = "Dau:"+log.logDate
          //提取样例类中的mid将其保存到Redis
          jedis.sadd(redisKey,log.mid)
        })
        jedis.close()
      })
    })
  }
}
